The purpose of this project is to implement in Python a procedure for selecting the variables in the logit model. The procedure for selecting the variables is a stepwise search which optimizes the prediction error estimated by cross-validation.
Let $\left(X_{1}^{\top}, Y_{1}\right), \ldots,\left(X_{n}^{\top}, Y_{n}\right)$ be observed independent copies of the random vector $$ \left(X^{\top}, Y\right) \text { with } \mathbf{X}=\left[\begin{array}{ccccc} 1 & x_{1,1} & x_{1,2} & \ldots & x_{1, p} \\ 1 & x_{2,1} & x_{2,2} & \ldots & x_{2, p} \\ \ldots & & & & \\ 1 & x_{n, 1} & x_{n, 2} & \ldots & x_{n, p} \end{array}\right] \in \mathbb{R}^{n \times(p+1)} \text { and } Y \in\{0,1\} . $$
The distribution of $Y$ given $X=x$ is assumed to be a logit model, such that
$$ \mathbb{P}(Y=1 \mid X=x)=\frac{e^{x^{\top} \beta}}{1+e^{x^{\top} \beta}} \text { and } \mathbb{P}(Y=0 \mid X=x)=1-\mathbb{P}(Y=1 \mid X=x), $$# Setup ----------------------
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
import pandas as pd
import time
import tqdm
%precision %.5f
import warnings
warnings.filterwarnings("ignore")
import dask
from dask import delayed
from dask.distributed import Client
import concurrent.futures
import plotly.graph_objects as go
from plotly.subplots import make_subplots
Here a function for generating data is defined and a logistic regression model will be fit with statsmodels
# Define data generation function
n = 1000
def logit(x, beta):
l = x.dot(beta)
return 1/(1+np.exp(-l))
# Generate data from logit distribution
rng = np.random.RandomState(1234)
x1, x2 = rng.normal(size=n), rng.normal(size=n)
X = sm.add_constant(np.c_[x1, x2]) # add ones to the first column for bias
beta = [1., 2., -1.] # Choose some arbitrary coefficients
p = logit(X, beta)
y = np.random.binomial(1., p) # generate the data
#Plot Data
plt.axes().set_aspect("equal")
plt.scatter(x1, x2, c = y, marker = ".")
# Fit Logistic regression
model = sm.Logit(y, X).fit()
print(model.summary())
ypred = np.round(model.predict(X))
print(f'Test accuracy = {np.sum(y == ypred)/ y.size}')
Optimization terminated successfully.
Current function value: 0.439218
Iterations 7
Logit Regression Results
==============================================================================
Dep. Variable: y No. Observations: 1000
Model: Logit Df Residuals: 997
Method: MLE Df Model: 2
Date: Wed, 19 Oct 2022 Pseudo R-squ.: 0.3307
Time: 23:08:20 Log-Likelihood: -439.22
converged: True LL-Null: -656.24
Covariance Type: nonrobust LLR p-value: 5.602e-95
==============================================================================
coef std err z P>|z| [0.025 0.975]
------------------------------------------------------------------------------
const 0.9034 0.091 9.931 0.000 0.725 1.082
x1 1.8763 0.130 14.456 0.000 1.622 2.131
x2 -0.8096 0.093 -8.708 0.000 -0.992 -0.627
==============================================================================
Test accuracy = 0.792
The following function which takes as input argument the sample. This function returns the estimator of the error of prediction obtained by cross-validation using LeaveOnOut strategy.
Here first we define a more complex set of data, in order to better show the results of the parallel implementation. We have observed that using parallel computation methods is useless for small tasks.
rng = np.random.RandomState(1234)
n = 1000
beta = [0, 1, 0, -2, 3, 0, 6, -1, 0, 0]
rng = np.random.RandomState(1234)
X = rng.randn(n*len(beta)).reshape(n, len(beta))
p = logit(X, beta)
y = np.random.binomial(1., p)
%%time
def cv(X, y):
nobs = np.size(X,0)
score = 0
indices = np.arange(nobs)
for i in range(nobs):
sapp = indices != i
sval = i
model = sm.Logit(y[sapp], X[sapp,:]).fit(disp=False)
ypred = np.round(model.predict(X[sval, :]))
score += y[sval] == ypred
return (score / nobs)[0]
cv(X, y)
CPU times: user 1min 1s, sys: 1min 17s, total: 2min 18s Wall time: 12.4 s
0.92200
%%time
client = Client(threads_per_worker=2, n_workers=4)
result1 = delayed(cv)(X,y)
print(result1.compute())
client.close()
result1.visualize()
0.922 CPU times: user 448 ms, sys: 350 ms, total: 798 ms Wall time: 4.39 s
Parallelizing the CV function using dask.delayed reduced the computational time by its half using 4 cores. Let's see what happens with 8:
%%time
client = Client(threads_per_worker=4, n_workers=8)
result2 = delayed(cv)(X,y)
print(result2.compute())
client.close()
0.922 CPU times: user 360 ms, sys: 143 ms, total: 503 ms Wall time: 4.68 s
Increasing the numbers of workers does not always imply improving results. It should be proportional to the workload.
def generate_X_y(num_var:int=20,num_obs:int=1000)->np.ndarray:
"""This function generate random set with nvar and nobs
Args:
num_var (int, optional): num of var. Defaults to 20.
num_obs (int, optional): num of obs. Defaults to 1000.
Returns:
np.ndarray: X and y
"""
X = sm.add_constant(np.c_[np.random.normal(size=(num_obs, num_var-1))])
beta=np.random.normal(size=num_var)
p = logit(X, beta)
y = np.random.binomial(1., p).reshape(num_obs,1)
return X,y
X_large,y_large=generate_X_y(20,2000)
%%time
cv(X_large, y_large)
CPU times: user 3min 13s, sys: 3min 55s, total: 7min 9s Wall time: 37.2 s
0.86000
Dask improve speed by 4 when the set size increase
%%time
client = Client(threads_per_worker=2, n_workers=4)
result1 = delayed(cv)(X_large,y_large)
print(result1.compute())
client.close()
result1.visualize()
0.86 CPU times: user 561 ms, sys: 50.5 ms, total: 612 ms Wall time: 12.3 s
def cv_thread_process(X:np.ndarray, y:np.ndarray,worker:int,Thread:bool=True)->float:
"""this function parallelise the cv with Thread or process from concurrent futures
Args:
X (np.ndarray): X matrix
y (np.ndarray): y vector
worker (int): number of workers
Thread (bool, optional): bool=Thread else process. Defaults to True.
Returns:
float:return the accuracy
"""
def cv_iter(i,X,y):
indices = np.arange(np.size(X,0))
sapp = indices != i
sval = i
model = sm.Logit(y[sapp], X[sapp,:]).fit(disp=False)
ypred = np.round(model.predict(X[sval, :]))
score = y[sval] == ypred
return score
nobs = np.size(X,0)
if Thread==True:
with concurrent.futures.ThreadPoolExecutor(max_workers=worker) as executor:
results=executor.map(cv_iter, range(1,nobs), [X]*nobs,[y]*nobs)
executor.shutdown()
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=worker) as executor:
results=map(cv_iter, range(1,nobs), [X]*nobs,[y]*nobs)
executor.shutdown()
return sum(results)[0]/nobs
%%time
cv_thread_process(X,y, 12, Thread=True)
CPU times: user 53.1 s, sys: 1min 7s, total: 2min Wall time: 10.6 s
0.92100
%%time
cv_thread_process(X,y, 12, Thread=False)
CPU times: user 1min 10s, sys: 1min 29s, total: 2min 39s Wall time: 13.8 s
0.92100
Thread doesn't improve and process is worst than normal cv. Concurent futures isn't robust in that case
To provide quantitative data on the parallel performance of our application, we are going to measure our parallel scaling with Strong Scaling: time for fixed problem size as number of processor is increased
class StrongScaling:
"""This class generate strong scaling curve"""
def __init__(self,core_range:list=list(range(1,17))):
self.core_range=core_range
def strong_scaling(self,X:np.ndarray,y:np.ndarray,type_Thread:bool)->list:
"""This function compute de strong scaling curve for an array X and y
Args:
X (np.ndarray): matrix X
y (np.ndarray): vector y
type_Thread (bool): True if using Thread False for process
Returns:
list: historic of time in function of core in thread or processus
"""
time_historic=[]
if type_Thread:
for i in self.core_range:
start=time.time()
cv_thread_process(X, y,i,Thread=True)
stop=time.time()
time_historic.append(stop-start)
else:
for i in self.core_range:
start=time.time()
cv_thread_process(X, y,i,Thread=False)
stop=time.time()
time_historic.append(stop-start)
return time_historic
def strong_scaling_multiple(self,list_var:list,nobs:int)->dict:
""" this function compute the strong scaling curve for many X and y
Args:
list_var (list): list of number of variables
nobs (int): number of obs for each set
Returns:
dict,dict: dict of time for thread and process
"""
dictionnary_thread_time={}
dictionnary_process_time={}
for var in tqdm.tqdm(list_var):
X,y=generate_X_y(var,nobs)
dictionnary_thread_time["variables "+str(var)]=self.strong_scaling(X,y,True)
dictionnary_process_time["variables "+str(var)]=self.strong_scaling(X,y,False)
return dictionnary_thread_time,dictionnary_process_time
dictionnary_thread_time,dictionnary_process_time=StrongScaling().strong_scaling_multiple([10,20,50],1000)
100%|██████████| 3/3 [1:25:18<00:00, 1706.11s/it]
thread_curves=pd.DataFrame(dictionnary_thread_time,index=range(1,17))
process_curves=pd.DataFrame(dictionnary_process_time,index=range(1,17))
process_curves
| variables 10 | variables 20 | variables 50 | |
|---|---|---|---|
| 1 | 9.611544 | 13.168691 | 83.590250 |
| 2 | 9.242544 | 12.605175 | 86.540803 |
| 3 | 9.499396 | 13.676839 | 83.119842 |
| 4 | 9.979800 | 13.934739 | 83.091265 |
| 5 | 10.638895 | 13.588097 | 81.601782 |
| 6 | 7.866486 | 13.565158 | 79.976513 |
| 7 | 8.278714 | 12.855550 | 78.525045 |
| 8 | 8.465188 | 10.759225 | 85.075973 |
| 9 | 8.564870 | 13.046129 | 84.761789 |
| 10 | 8.167425 | 14.140499 | 78.315940 |
| 11 | 9.260344 | 12.608719 | 85.603778 |
| 12 | 8.612048 | 12.847224 | 83.710822 |
| 13 | 8.846300 | 13.996606 | 84.438727 |
| 14 | 9.309823 | 13.365880 | 81.801426 |
| 15 | 8.504881 | 14.180995 | 79.193812 |
| 16 | 8.817877 | 14.696520 | 78.187195 |
thread_curves
| variables 10 | variables 20 | variables 50 | |
|---|---|---|---|
| 1 | 9.334562 | 11.107100 | 76.253473 |
| 2 | 7.210241 | 10.333477 | 83.459504 |
| 3 | 7.351614 | 14.607921 | 112.085886 |
| 4 | 7.499275 | 20.258789 | 144.727232 |
| 5 | 7.835736 | 17.612512 | 166.841285 |
| 6 | 8.250704 | 21.094695 | 188.335552 |
| 7 | 8.268324 | 20.116830 | 197.721071 |
| 8 | 8.327990 | 16.965829 | 212.924021 |
| 9 | 8.362028 | 19.255391 | 216.905652 |
| 10 | 8.297633 | 16.294657 | 223.401685 |
| 11 | 8.522640 | 21.603396 | 224.033805 |
| 12 | 9.067732 | 15.333988 | 233.846651 |
| 13 | 8.251261 | 19.150719 | 233.832960 |
| 14 | 8.867811 | 20.318651 | 226.862237 |
| 15 | 9.855490 | 23.283908 | 239.062054 |
| 16 | 10.099325 | 20.082687 | 240.963753 |
fig = make_subplots(rows=3, cols=1,subplot_titles=('Run time for 10 variables', 'Run time for 20 variables','Run time for 50 variables'),x_title="Number of workers",
y_title="Run time in seconds")
fig.append_trace(go.Scatter(
x=process_curves.index,
y=process_curves[process_curves.columns[0]],name=process_curves.columns[0],line_shape='spline'
), row=1, col=1)
fig.append_trace(go.Scatter(
x=process_curves.index,
y=process_curves[process_curves.columns[1]],name=process_curves.columns[1],line_shape='spline'
), row=2, col=1)
fig.append_trace(go.Scatter(
x=process_curves.index,
y=process_curves[process_curves.columns[2]],name=process_curves.columns[2],line_shape='spline'
), row=3, col=1)
fig.update_layout(height=800, width=800, title_text="Run time Process",title_x=0.5,legend=dict(y=0.5))
fig.show(renderer='notebook')
fig = make_subplots(rows=3, cols=1,subplot_titles=('Run time for 10 variables', 'Run time for 20 variables','Run time for 50 variables'),x_title="Number of workers",
y_title="Run time in seconds")
fig.append_trace(go.Scatter(
x=thread_curves.index,
y=thread_curves[thread_curves.columns[0]],name=thread_curves.columns[0],line_shape='spline'
), row=1, col=1)
fig.append_trace(go.Scatter(
x=thread_curves.index,
y=thread_curves[thread_curves.columns[1]],name=thread_curves.columns[1],line_shape='spline'
), row=2, col=1)
fig.append_trace(go.Scatter(
x=thread_curves.index,
y=thread_curves[thread_curves.columns[2]],name=thread_curves.columns[2],line_shape='spline'
), row=3, col=1)
fig.update_layout(height=800, width=800, title_text="Run time Thread",title_x=0.5,legend=dict(y=0.5))
fig.show(renderer='notebook')
speed_up_thread=thread_curves.iloc[0,:]/thread_curves
speed_up_process=process_curves.iloc[0,:]/process_curves
fig = make_subplots(rows=3, cols=1,subplot_titles=('Speed up for 10 variables', 'Speed up for 20 variables','Speed up for 50 variables'),x_title="Number of workers",
y_title="Speed up")
fig.append_trace(go.Scatter(
x=speed_up_process.index,
y=speed_up_process[speed_up_process.columns[0]],name=speed_up_process.columns[0],line_shape='spline'
), row=1, col=1)
fig.append_trace(go.Scatter(
x=speed_up_process.index,
y=speed_up_process[speed_up_process.columns[1]],name=speed_up_process.columns[1],line_shape='spline'
), row=2, col=1)
fig.append_trace(go.Scatter(
x=speed_up_process.index,
y=speed_up_process[speed_up_process.columns[2]],name=speed_up_process.columns[2],line_shape='spline'
), row=3, col=1)
fig.update_layout(height=800, width=800, title_text="Strong scaling Process",title_x=0.5,legend=dict(y=0.5))
fig.show(renderer='notebook')
fig = make_subplots(rows=3, cols=1,subplot_titles=('Speed up for 10 variables', 'Speed up for 20 variables','Speed up for 50 variables'),x_title="Number of workers",
y_title="Speed up")
fig.append_trace(go.Scatter(
x=speed_up_thread.index,
y=speed_up_thread[speed_up_thread.columns[0]],name=speed_up_thread.columns[0],line_shape='spline'
), row=1, col=1)
fig.append_trace(go.Scatter(
x=speed_up_thread.index,
y=speed_up_thread[speed_up_thread.columns[1]],name=speed_up_thread.columns[1],line_shape='spline'
), row=2, col=1)
fig.append_trace(go.Scatter(
x=speed_up_thread.index,
y=speed_up_thread[speed_up_thread.columns[2]],name=speed_up_thread.columns[2],line_shape='spline'
), row=3, col=1)
fig.update_layout(height=800, width=800, title_text="Strong scaling Thread",title_x=0.5,legend=dict(y=0.5))
fig.show(renderer='notebook')
Results of Run time aren't robust sometime it improves sometime not. In that case concurent.futures isn't the best way to increase speed time.
Process could be better than thread when set is larger
The strategy we have used in this project was the Forward Regression method, which we have implemented using cross validation.
This is the traditional approach using p-values, just to check consistency.
%%time
rng = np.random.RandomState(1234)
import statsmodels.api as sm
def forward_regression(X, y, threshold_in = 0.01):
included = []
nvars = np.size(X, 1)
while True:
changed=False
excluded = list(set(range(nvars))-set(included))
new_pval = np.full(nvars, np.inf)
for new_column in excluded:
test = included+[new_column]
X1 = sm.add_constant(X[:,test])
model = sm.Logit(y, X1).fit(disp=False)
new_pval[new_column] = model.pvalues[-1]
best_pval = new_pval.min()
if best_pval < threshold_in:
best_feature = np.argmin(new_pval)
included.append(best_feature)
changed=True
if not changed:
return included
forward_regression(X, y)
CPU times: user 330 ms, sys: 198 ms, total: 528 ms Wall time: 116 ms
[6, 4, 3, 1, 7]
This is our approach. Here the functions iterate among all the possible models by adding a new variable each time and computing the error rate of each model. Then among all the error rate computed, a variable is selected if it corresponds to the minimum one. After then, the iterations continues including the previously selected variable and so on, until when adding another variable does not minimize the least error rate computed in the previous models.
%%time
rng = np.random.RandomState(1234)
def forward_regression_cv(X:np.ndarray, y:np.ndarray)->list:
"""Function forward selection with cross val
Args:
X (np.ndarray): X matrix
y (np.ndarray): y vector
Returns:
list: list of variables
"""
included = []
nvars = np.size(X, 1)
accuracies = []
while True:
changed=False
print(f'Computing CV with subset {included} ...')
excluded = list(set(range(nvars))-set(included))
new_acc = np.full(nvars, np.inf)
for new_column in excluded:
test = included+[new_column]
X1 = sm.add_constant(X[:,test])
new_acc[new_column] = 1-cv(X1,y) #error rate
best_acc = new_acc.min()
accuracies.append(new_acc.min())
# We need to tell the algorithm when to stop including other variables
# So, after including half of the variables, the best accuracy should be met soon
# Therefore stop including variables if they do not improve best accuracy recorded yet
if len(accuracies) > nvars/2:
if best_acc < min(new_acc):
best_feature = np.argmin(new_acc)
included.append(best_feature)
changed=True
if not changed:
acc = 1-new_acc.min()
return print(f'Including other variables after {included}, does not improve accuracy of {acc*100}%')
else:
if best_acc < np.mean(new_acc):
best_feature = np.argmin(new_acc)
included.append(best_feature)
changed=True
if not changed:
return included
forward_regression_cv(X, y)
Computing CV with subset [] ... Computing CV with subset [6] ... Computing CV with subset [6, 4] ... Computing CV with subset [6, 4, 3] ... Computing CV with subset [6, 4, 3, 7] ... Computing CV with subset [6, 4, 3, 7, 1] ... Including other variables after [6, 4, 3, 7, 1], does not improve accuracy of 92.60000000000001% CPU times: user 5min 12s, sys: 3min 38s, total: 8min 51s Wall time: 1min 45s
%%time
client = Client(threads_per_worker=2, n_workers=4)
print(delayed(forward_regression_cv)(X, y).compute())
client.close()
Computing CV with subset [] ... Computing CV with subset [6] ... Computing CV with subset [6, 4] ... Computing CV with subset [6, 4, 3] ... Computing CV with subset [6, 4, 3, 7] ... Computing CV with subset [6, 4, 3, 7, 1] ... Including other variables after [6, 4, 3, 7, 1], does not improve accuracy of 92.60000000000001% None CPU times: user 1.83 s, sys: 1.03 s, total: 2.86 s Wall time: 1min 21s
def forward_regression_cv_concurrent(X:np.ndarray, y:np.ndarray,workers:int,bool_thread:bool)->list:
"""Function forward selection with cross val
Args:
X (np.ndarray): X matrix
y (np.ndarray): y vector
workers (int): num of workers
bool_thread(bool): bool to determine thread or process
Returns:
list: list of variables
"""
included = []
nvars = np.size(X, 1)
accuracies = []
while True:
changed=False
print(f'Computing CV with subset {included} ...')
excluded = list(set(range(nvars))-set(included))
new_acc = np.full(nvars, np.inf)
for new_column in excluded:
test = included+[new_column]
X1 = sm.add_constant(X[:,test])
new_acc[new_column] = 1-cv_thread_process(X1,y,worker=workers,Thread=bool_thread) #error rate
best_acc = new_acc.min()
accuracies.append(new_acc.min())
# We need to tell the algorithm when to stop including other variables
# So, after including half of the variables, the best accuracy should be met soon
# Therefore stop including variables if they do not improve best accuracy recorded yet
if len(accuracies) > nvars/2:
if best_acc < min(new_acc):
best_feature = np.argmin(new_acc)
included.append(best_feature)
changed=True
if not changed:
acc = 1-new_acc.min()
return print(f'Including other variables after {included}, does not improve accuracy of {acc*100}%')
else:
if best_acc < np.mean(new_acc):
best_feature = np.argmin(new_acc)
included.append(best_feature)
changed=True
if not changed:
return included
%%time
forward_regression_cv_concurrent(X, y,12,True) #with Thread
Computing CV with subset [] ... Computing CV with subset [6] ... Computing CV with subset [6, 4] ... Computing CV with subset [6, 4, 3] ... Computing CV with subset [6, 4, 3, 7] ... Computing CV with subset [6, 4, 3, 7, 1] ... Including other variables after [6, 4, 3, 7, 1], does not improve accuracy of 92.5% CPU times: user 11min 21s, sys: 8min 49s, total: 20min 11s Wall time: 3min 41s
%%time
forward_regression_cv_concurrent(X, y,12,False) #with process
Computing CV with subset [] ... Computing CV with subset [6] ... Computing CV with subset [6, 4] ... Computing CV with subset [6, 4, 3] ... Computing CV with subset [6, 4, 3, 7] ... Computing CV with subset [6, 4, 3, 7, 1] ... Including other variables after [6, 4, 3, 7, 1], does not improve accuracy of 92.5% CPU times: user 4min 8s, sys: 2min 51s, total: 7min Wall time: 1min 29s
Process is more efficient than thread even in model selection
Parallelisation with dask is easier to implement and give better results than concurent.futures. Moreover, dask delayed is more robust than concurent.futures Thread and process.
With the strong scaling curves we clearly observe that paralelisation with thread and process isn't relevant and comport an important part of randomness.
For huge dataset Process seems more efficient than Thread regarding the Strong scaling curves.
Sometime with huge datasets we can observe a few improvement but it's not really robust..
Delayed has improved the speed of our code by 3 or 4.